package org.andy.zookeeper;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class ZooKeeperWatcher implements Watcher {

	/** 定义原子变量 */
	AtomicInteger seq = new AtomicInteger();
	/** 定义session失效时间 */
	private static final int SESSION_TIMEOUT = 10000;
	/** zookeeper服务器地址 */
	private static final String CONNECTION_ADDR = "localhost:2181";
	/** zk父路径设置 */
	private static final String PARENT_PATH = "/testWatch";
	/** zk子路径设置 */
	private static final String CHILDREN_PATH = "/testWatch/children";
	/** 进入标识 */
	private static final String LOG_PREFIX_OF_MAIN = "【Main】";
	/** zk变量 */
	private ZooKeeper zk = null;
	/** 信号量设置，用于等待zookeeper连接建立之后 通知阻塞程序继续向下执行 */
	private CountDownLatch connectedSemaphore = new CountDownLatch(1);
	
	/**
	 * 创建链接
	 * @param connectionAddress 链接地址
	 * @param sessionTimeOut   超时时间
	 * @author andy.hu
	 */
	public void createConnection(String connectionAddress, int sessionTimeOut) {
	    this.closeConnection();
	    try {
            zk = new ZooKeeper(connectionAddress, sessionTimeOut, this);
            System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器");
            connectedSemaphore.await();
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
	}
	
	/**
	 * 关闭链接 
	 * @author andy.hu
	 */
    private void closeConnection() {
        if (this.zk != null) {
            try {
                this.zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    /**
     * 创建节点
     * @param path  节点路径
     * @param data  节点数据
     * @return
     * @author andy.hu
     */
    public boolean createPath(String path, String data, boolean needWatch) {
        try {
            this.zk.exists(path, needWatch);
            String result = this.zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " + result + ", content: " + data);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return true;
    }
    
    /**
     * 读取指定节点的数据
     * @param path  节点路径
     * @param needWatch 是否添加watch
     * @return
     * @author andy.hu
     */
    public String readData(String path, boolean needWatch) {
        try {
            System.out.println("---------------------readData");
            return new String(this.zk.getData(path, needWatch, null));
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
            return "";
        } 
    }
    
    /**
     * 更新指定节点数据内容
     * @param path  指定节点路径
     * @param data  数据内容
     * @return
     * @author andy.hu
     */
    public boolean writeData(String path, String data) {
        try {
            Stat result = this.zk.setData(path, data.getBytes(), -1);
            System.out.println(LOG_PREFIX_OF_MAIN + "更新数据成功，path：" + path + ", stat: " + result);
        } catch (KeeperException |InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }
    
    /**
     * 删除指定节点
     * @param path  指定节点路径
     * @author andy.hu
     */
    public void deleteNode(String path) {
        try {
            this.zk.delete(path, -1);
            System.out.println(LOG_PREFIX_OF_MAIN + "删除节点成功，path：" + path);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 判断指定节点是否存在
     * @param path 节点路径
     */
    public Stat exists(String path, boolean needWatch) {
        try {
            return this.zk.exists(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
    
    /**
     * 获取子节点
     * @param path 节点路径
     */
    private List<String> getChildren(String path, boolean needWatch) {
        try {
            return this.zk.getChildren(path, needWatch);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
    
    public void deleteAllTestPath() {
        try {
            if (this.zk.exists(CHILDREN_PATH, false) != null) {
               this.deleteNode(CHILDREN_PATH); 
            }
            if (this.zk.exists(PARENT_PATH, false) != null) {
                this.deleteNode(PARENT_PATH);
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent event) {
        System.out.println("进入 process 。。。。。event = " + event);
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (null == event) {
            return;
        }
        
        // 链接状态
        KeeperState keeperState = event.getState();
        //事件状态
        EventType eventType = event.getType();
        String path = event.getPath();
        String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";
        
        System.out.println(logPrefix + "收到Watcher通知");
        System.out.println(logPrefix + "连接状态:\t" + keeperState.toString());
        System.out.println(logPrefix + "事件类型:\t" + eventType.toString());
        if (KeeperState.SyncConnected == keeperState) {
            if (EventType.None == eventType) {
                System.out.println(logPrefix + "成功连接上ZK服务器");
                connectedSemaphore.countDown();
            }else if (EventType.NodeCreated == eventType) {
                System.out.println(logPrefix + "节点创建");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                this.exists(path, true);
            }else if (EventType.NodeDataChanged == eventType) {
                System.out.println(logPrefix + "节点数据更新");
                System.out.println("in NodeDataChanged ........");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(logPrefix + "数据内容: " + this.readData(PARENT_PATH, true));
            }else if (EventType.NodeChildrenChanged == eventType) {
                System.out.println(logPrefix + "子节点变更");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(logPrefix + "子节点列表：" + this.getChildren(PARENT_PATH, true));
            }else if (EventType.NodeDeleted == eventType) {
                System.out.println(logPrefix + "节点 " + path + " 被删除");
            }
        }else if (KeeperState.Disconnected == keeperState) {
            System.out.println(logPrefix + "与ZK服务器断开连接");
        }else if (KeeperState.AuthFailed == keeperState) {
            System.out.println(logPrefix + "权限检查失败");
        }else if (KeeperState.Expired == keeperState) {
            System.out.println(logPrefix + "会话失效");
        }
        
        System.out.println("--------------------------------------------");
    }

	public static void main(String[] args) throws InterruptedException {
	    // 建立watcher
        ZooKeeperWatcher zkWatcher = new ZooKeeperWatcher();
        // 创建链接
        zkWatcher.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);
        
        Thread.sleep(1000);
        
        zkWatcher.deleteAllTestPath();
        if (zkWatcher.createPath(PARENT_PATH, String.valueOf(System.currentTimeMillis()), true)) {
            Thread.sleep(1000);
            // 读取数据
            System.out.println("---------------------- read parent ----------------------------");
            zkWatcher.readData(PARENT_PATH, true);
            
            // 读取子节点
            System.out.println("---------------------- read children path ----------------------------");
            zkWatcher.getChildren(PARENT_PATH, true);

            // 更新数据
            zkWatcher.writeData(PARENT_PATH, System.currentTimeMillis() + "");
            
            Thread.sleep(1000);
            
            // 创建子节点
            zkWatcher.createPath(CHILDREN_PATH, System.currentTimeMillis() + "", true);
            
            Thread.sleep(1000);
            
            zkWatcher.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");
        }
        
        Thread.sleep(3000);
        // 清理节点
        zkWatcher.deleteAllTestPath();
        Thread.sleep(1000);
        zkWatcher.closeConnection();
    }

}